---
title: 09 连接池
---

## 问题

[教程 04](/tutorial/04-routing) 中我们介绍过 *muxHTTP* 中有一个连接管理器，键值对的存储。值为连接，这里的连接实际上是指向到目标 pipeline 的，目标 pipeline 中使用的 *connect* 过滤器维护着到上游的连接。在前几次的教程，我们使用目标服务器地址作为键。这就意味着，每个上游都只有一个连接。pipy 的 *muxHTTP* 为 HTTP1 提供了多路复用的支持，但是同一个下游连接的请求和响应的按顺序传输，不可避免存在着性能的问题。

我们可以测试一下。pipy 有个功能类似内存的 dump，只要向 pipy 进程发送 *SIGTSTP* 信号即可打印出可读的信息，比如加载的类和实例的数量、管道的数量等等。

我们使用 *wrk* 使用 5 个连接发送一些请求： `wrk -t 1 -c 5 -d 2s --latency http://localhost:8000/ip`。请求结束我们向 pipy 进程发送 *SIGTSTP* 信号，比如在 pipy 命令行窗口按 *ctrl+z*。

在管道一栏信息里，我们看到 *connection* 管道总实例数和活跃实例数均为 1，这说明到上游只使用了一个连接。

```
PIPELINE                             #ALLOCATED  #ACTIVE
----------------------------------------------------------
  /plugins/balancer.js [connection]  1           1
```

通常的做法，这里可以使用连接池来管理到上游的连接，这次我们就来实现连接池的功能。

## ResourcePool

Pipy  在 `algo` 包中提供了资源池类型 `ResourcePool`，它接受一个函数（资源生成器）作为构造参数。当资源池中没有空闲资源时，会调用该函数分配新的资源。

```js
new algo.ResourcePool(
    () => '' //return new resource
)
```

`ResourcePool` 对外提供了两个方法：

* `allocate(pool)`：返回指定池中的资源，参数为池的名字。
* `free(resource)`：释放资源，将资源放回池中，参数为要释放的资源。

看到这里，大概可以想到：每个上游服务都有一个池，池中放着 *muxHTTP* 要用的键。那么键如何生成呢？也就是资源生成器如何实现。这里我们使用变量递增的方式，保证每次创建的“资源”都不同：

```js
  pipy({
    _services: (
      Object.fromEntries(
        Object.entries(config.services).map(
          ([k, v]) => [
            k, new algo.RoundRobinLoadBalancer(v)
          ]
        )
      )
    ),

    _balancer: null,
    _balancerCache: null,
    _target: '',

+   _g: {
+     connectionID: 0,
+   },

+   _connectionPool: new algo.ResourcePool(
+     () => ++_g.connectionID
+   ),
  })
```

我们引入两个新的变量 `_g` 和 `_connectionPool`。细心的你可能会问用于递增的变量为什么要放在 `_g` 中呢？

`pipy()` 的参数是整个 pipy 实例的“根上下文”，当某些管道连接器在连接子管道时会从“根上下文”复制一个新的上下文，并在新的上下文中对全局变量进行操作，比如 “use”、“link”连接器。与之相反的是 “merge”连接器，会在原来的上下文中操作全局变量，这个会在后面详细介绍。

假如这里直接使用 `connectionID` 作为全局变量，初始值为 *0*。在 “use”连接的子管道中对全局变量“connectionID”进行递增操作，其值并**不会反映到“根上下文”中的变量上**。后续其他连接过来的请求，甚至同一个连接过来的请求，对该值都是**不可见的**。

但是将 `connectionID` 放在另外一个变量 `_g` 中，子管道拿到的是 `_g` 指向的对象/内存地址。一个请求对 `connectionID` 做了修改，其他请求也会读到修改后的值。

> 这就是某些语言中所说的传值、传地址的区别。

回到开头的测试，有了资源池之后，下游的 5 个连接，池（只有 1 个上游目标服务器，因此只有 1 个池）中会有 5 个资源：1、2、3、4、5。某个下游连接在从池中获取资源时，获取到可能是其中的任何一个。但我们希望使用固定的同一个到上游的连接，正如我们在[上一篇](/tutorial/08-load-balancing-improved) 中实现的：同一个下游连接的请求，都可以路由到同一个上游目标服务器。

同样，我们使用缓存来解决。

## Cache

```js
  pipy({
    _services: (
      Object.fromEntries(
        Object.entries(config.services).map(
          ([k, v]) => [
            k, new algo.RoundRobinLoadBalancer(v)
          ]
        )
      )
    ),

    _balancer: null,
    _balancerCache: null,
    _target: '',
+   _targetCache: null,

    _g: {
      connectionID: 0,
    },

    _connectionPool: new algo.ResourcePool(
      () => ++_g.connectionID
    ),
  })

  .pipeline('session')
    .handleStreamStart(
      () => (
+       _targetCache = new algo.Cache(
+         // k is a target, v is a connection ID
+         (k  ) => _connectionPool.allocate(k),
+         (k,v) => _connectionPool.free(v),
+       ),
        _balancerCache = new algo.Cache(
          // k is a balancer, v is a target
          (k  ) => k.select(),
          (k,v) => k.deselect(v),
        )        
      )
    )
    .handleStreamEnd(
      () => (
-       _targetCache.clear(),
+       _balancerCache.clear()
      )
    )
```

我们引入了新的缓存 `_targetCache`，并为其提供构造参数：从资源池中申请资源、将资源放回池中。同样，也需要在下游连接断开（*StreamEnd*）时对缓存进行清理。

接下来也要调整 *muxHTTP*  过滤器键的获取方式：

```js
  .pipeline('forward')
    .muxHTTP(
      'connection',
-     () => _target
+     () => _targetCache.get(_target)
    )
```

##  测试

使用开头的测试方式进行测试，并 dump 信息，可以看到 *connection* 管道的总实例数和活跃实例数均为 5，正好对应下游的连接数。

```
PIPELINE                                  #ALLOCATED  #ACTIVE
---------------------------------------------------------------
  /plugins/balancer.js [connection]       5           5
```

## 总结

这次我们学习了如何使用资源池类型 `ResourcePool` 结合 *muxHTTP* 过滤器实现连接池的功能。本次新增的代码行数不多，但是本篇的篇幅比较长，是为了加深对全局变量、*muxHTTP* 的理解。

### 收获

1. 使用 `ResourcePool` 实现资源池的功能。
2. 对需要跨下游连接的变量，需要将其作为全局变量的一个字段。

### 接下来

标准的路由功能，还需要能对路径进行灵活的处理。比如路由表中的 `/ip`，这个路径在上游服务中并不存在。下面我们就实现解决这种问题的重定向功能。